-
Notifications
You must be signed in to change notification settings - Fork 2.2k
multi: update ChanUpdatesInHorizon and NodeUpdatesInHorizon to return iterators (iter.Seq[T]) #10128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multi: update ChanUpdatesInHorizon and NodeUpdatesInHorizon to return iterators (iter.Seq[T]) #10128
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @Roasbeef, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant architectural shift in how large sets of channel and node updates are retrieved from the graph database. By migrating to Go's new iter.Seq[T]
interface, the system now supports streaming and batched data retrieval, leading to reduced memory consumption and improved performance. Furthermore, optimizations have been applied to reduce mutex contention during cache updates and to integrate public node filtering directly into the data fetching process, enhancing overall efficiency and concurrency.
Highlights
- API Refactor: Iterator-based Data Retrieval: The
ChanUpdatesInHorizon
andNodeUpdatesInHorizon
functions have been updated to returniter.Seq[T]
instead of slices. This enables streaming and batched retrieval of graph data, allowing for more efficient memory usage and early termination of data processing. - Performance Optimization: Batched Database Reads: Data is now fetched from both KV and SQL databases in configurable batches. This reduces the memory footprint for large queries and improves overall performance by avoiding loading the entire dataset into memory at once.
- Concurrency Improvement: Reduced Mutex Contention: The cache update mechanism has been refined. Instead of holding a global cache mutex for the entire duration of a data retrieval operation, updates are now applied incrementally per batch. This significantly reduces the time the mutex is held, thereby decreasing contention and improving concurrency.
- Query Optimization: Integrated Public Node Filtering: Public node filtering, previously a post-retrieval step, has been integrated directly into the underlying database queries (for SQL) or the batch processing logic (for KV store). This pushes filtering closer to the data source, improving efficiency.
- Go Version Update: The project's Go module version has been updated from 1.19 to 1.23 to support the new
iter
package, which is central to the API changes in this PR.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is a great refactoring to use iterators for fetching channel and node updates, which should significantly improve performance and reduce database transaction times for large graphs. The optimizations to the SQL queries are also a welcome improvement.
I've found a few minor issues, mostly related to style guide adherence (line length and typos in comments), and one more significant issue regarding an inconsistency in how time ranges are handled between the updated functions. Please take a look at the comments.
graph/db/interfaces.go
Outdated
ChanUpdatesInHorizon(startTime, endTime time.Time, | ||
opts ...Option) (iter.Seq[ChannelEdge], error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be an inconsistency in how the endTime
parameter is handled between ChanUpdatesInHorizon
and NodeUpdatesInHorizon
.
NodeUpdatesInHorizon
is now inclusive ofendTime
in bothkv_store
andsql_store
implementations. This is a change from the previous behavior in the SQL store.ChanUpdatesInHorizon
is now exclusive ofendTime
in both implementations. This is a change from the previous behavior in thekv_store
.
This inconsistency in the API can be surprising and lead to bugs. For API consistency, it would be better if both functions handled the time range inclusively or exclusively in the same way. I'd recommend making both inclusive to match the new behavior of NodeUpdatesInHorizon
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist this should be resolved now, all of the implementations are now inklusive of the endtime can you check again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, @ziggie1984! I've re-checked the implementations for ChanUpdatesInHorizon
and NodeUpdatesInHorizon
.
I can confirm that NodeUpdatesInHorizon
in both kv_store
and sql_store
implementations, as well as ChanUpdatesInHorizon
in the kv_store
implementation, are now inclusive of the endTime
parameter, which aligns with the goal of API consistency.
However, I noticed that the ChanUpdatesInHorizon
in the sql_store
implementation still uses an exclusive comparison for endTime
in its SQL query. This means it will not include updates that occurred exactly at the endTime
.
To fully resolve the inconsistency and make all implementations inclusive of endTime
, please update the GetChannelsByPolicyLastUpdateRange
query in sqldb/sqlc/queries/graph.sql
to use <=
instead of <
for the end_time
parameter.
Severity: high
ChanUpdatesInHorizon(startTime, endTime time.Time, | |
opts ...Option) (iter.Seq[ChannelEdge], error) | |
(cp1.last_update >= @start_time AND cp1.last_update <= @end_time) | |
OR | |
(cp2.last_update >= @start_time AND cp2.last_update <= @end_time) |
Cool idea. Will we be able to thread this through to There would still be an outgoing bandwidth issue, but this would help with excessive memory use. |
Yep, I realize now that I stopped just short of threading it up to that level. The missing change here would be updating
Isn't this effectively addressed via the |
This all sounds great to me.
My main concern was that one peer could easily monopolize the outbound bandwidth since we would still send the requested gossip as fast as possible. But #10103 was merged yesterday, which I think addresses this concern. |
@Roasbeef, remember to re-request review from reviewers when ready |
422529f
to
03177cd
Compare
Pushed up some new commits that update the call chain from |
03177cd
to
6d3fb04
Compare
Pushed a new rebased version. Working on some performance tests to show the impact of this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super cool new use of the Iterator, I think that's the way to go here, thank you for implementing this new design.
Alright, I've finished my experiments! Serial Processing in MasterThis was one of those classic cases where I created the experiment assuming one outcome, but gained more insight and arrived at anther outcome in the process. The first relevant tidbit is that today in master, we have a mutex that guards access to Lines 2042 to 2055 in 82f77e5
What this means is that only a single goroutine/peer can actually read the backlog from disk at a time. So even though the default filter sema value is 5, only one goroutine can actually read at a time. As an aside, I first tried to just remove that mutex to see how it faried against my testing program, but it wasn't pretty: master would OOM in around 5 seconds flat. If it didn't OOM, it would just lock up aggressively trying to GC as the main had no swap configured. Lazy Concurrent Processing w/ this PRBefore I arrived at the above insight, my assumption was that this PR would lead to both lower CPU and memory utilization. That was nearly the case, but not as much as I envisioned. The hidden truth here was that: this PR is able to serve concurrent client (up to the filter-sema) with lower CPU and memory than master, which can only serve a single client a a time. This branch was able to serve the same amount of clients, but with lower mutex contention, less goroutines, and less total allocations. I can provide these profiles if y'all are curious, but the next section paints a better picture w.r.t the gain of this PR. Time-To-First-Message ExperimentIn my experiment, I increased the filter sema value to 200, and ran with 50 concurrent clients. Each client would send a gossip filter, then wait until the first gossip message was received, record the latency, then repeat. I ran this for 5 minutes total. Master was able to process 437 iterations, while this PR could process 4768 iterations (10x improvement). For reference the box I was using has vCPUs, and my internet was a bit slow/wonky at times, so improvign both would likely see a greater improvement. I generated a scatterplot, histogram, and CDF with the raw data. I used
|
You can also see some interesting interaction in the scatter plot for master: each client needs to wait on the next to release the mutex, so the waiting time steadily increases. |
Impressive results that we can now serve way more syncers without any downsides 👌🫡 Even holding the graph cache mutex way shorter, nice ! |
6d3fb04
to
2b459be
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool!
nodeAnns, hasMore, err := c.fetchNextNodeBatch(state) | ||
if err != nil { | ||
log.Errorf("unable to read node updates in "+ | ||
"horizon: %v", err) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The big change here in terms of correctness is that we no longer do this DB read in the same transaction.
Since this method is used for gossip syncing, i think this is fine & it can just be "best-effort". But perhaps worth mentioning this in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah you mean that what the "true" backlog is can shift between batch reads? Yeah that's true, but I don't think consistency is super critical here. We have support the timestamps for query chan IDs now, so if someone really wants everything, then they can use that.
chain, startTime, endTime, | ||
} | ||
|
||
// We'll get the response from the channel, then yield it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit message doesnt match the diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will revisit, had to do a pretty gnarly rebase to got all the commits compiling lol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gtg, pending CI failures
Are we going to keep the SQL and kv backend similar regarding the endtime inclusiveness ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
2b459be
to
d3c57c9
Compare
Check commits is failing on CI, but passes for me locally. Perhaps it has something to do with it not handling the replace directive properly? |
In this commit, we introduce a new utility function `Collect` to the fn package. This function drains all elements from an iterator and returns them as a slice. This is particularly useful when transitioning from iterator-based APIs to code that expects slices, allowing for gradual migration to the new iterator patterns. The fn module's go.mod is also updated to require Go 1.23, which is necessary for the built-in iter.Seq type support. The replace directive will be removed once the fn package changes are merged and a new version is tagged.
In this commit, we introduce a new options pattern for configuring iterator behavior in the graph database. This includes configuration for batch sizes when iterating over channel and node updates, as well as an option to filter for public nodes only. The new functional options pattern allows callers to customize iterator behavior without breaking existing APIs. Default batch sizes are set to 1000 entries for both channel and node updates, which provides a good balance between memory usage and performance.
In this commit, we refactor the NodeUpdatesInHorizon method to return an iterator instead of a slice. This change significantly reduces memory usage when dealing with large result sets by allowing callers to process items incrementally rather than loading everything into memory at once. The new implementation uses Go 1.23's iter.Seq type to provide a standard iterator interface. The method now supports configurable batch sizes through functional options, allowing fine-tuned control over memory usage and performance characteristics. Rather than reading all the entries from disk into memory (before this commit, we did consult the cache for most entries, skipping the disk hits), we now expose a chunked iterator instead. We also make the process of filtering out public nodes first class. This saves many newly created db transactions later.
In this commit, we refactor the ChanUpdatesInHorizon method to return an iterator instead of a slice. This change significantly reduces memory usage when dealing with large result sets by allowing callers to process items incrementally rather than loading everything into memory at once.
In this commit, we update the SQL store implementation to support the new iterator-based API for NodeUpdatesInHorizon. This includes adding a new SQL query that supports efficient pagination through result sets. The SQL implementation uses cursor-based pagination with configurable batch sizes, allowing efficient iteration over large result sets without loading everything into memory. The query is optimized to use indexes effectively and minimize database round trips. New SQL query GetNodesByLastUpdateRange is updated to support: * Cursor-based pagination using (last_update, pub_key) compound cursor * Optional filtering for public nodes only * Configurable batch sizes via MaxResults parameter
In this commit, we update the SQL store implementation to support the new iterator-based API for ChanUpdatesInHorizon. This includes adding SQL query pagination support and helper functions for efficient batch processing. The SQL implementation uses cursor-based pagination with configurable batch sizes, allowing efficient iteration over large result sets without loading everything into memory. The query is optimized to use indexes effectively and minimize database round trips. New SQL query GetChannelsByPolicyLastUpdateRange is updated to support: - Cursor-based pagination using (max_update_time, id) compound cursor - Configurable batch sizes via MaxResults parameter - Efficient batch caching with updateChanCacheBatch helper
In this commit, we update all callers of NodeUpdatesInHorizon and ChanUpdatesInHorizon to use the new iterator-based APIs. The changes use fn.Collect to maintain existing behavior while benefiting from the memory efficiency of iterators when possible.
…e, error] In this commit, we complete the iterator conversion work started in PR 10128 by threading the iterator pattern through to the higher-level UpdatesInHorizon method. This change converts the method from returning a fully materialized slice of messages to returning a lazy iterator that yields messages on demand. The new signature uses iter.Seq2 to allow error propagation during iteration, eliminating the need for a separate error return value. This approach enables callers to handle errors as they occur during iteration rather than failing upfront. The implementation now lazily processes channel and node updates, yielding them as they're generated rather than accumulating them in memory. This maintains the same ordering guarantees (channels before nodes) while significantly reducing memory pressure when dealing with large update sets during gossip synchronization.
In this commit, we update ApplyGossipFilter to leverage the new iterator-based UpdatesInHorizon method. The key innovation here is using iter.Pull2 to create a pull-based iterator that allows us to check if any updates exist before launching the background goroutine. This approach provides several benefits over the previous implementation. First, we avoid the overhead of launching a goroutine when there are no updates to send, which was previously unavoidable without materializing the entire result set. Second, we maintain lazy loading throughout the sending process, only pulling messages from the database as they're needed for transmission. The implementation uses Pull2 to peek at the first message, determining whether to proceed with sending updates. If updates exist, ownership of the iterator is transferred to the goroutine, which continues pulling and sending messages until exhausted. This design ensures memory usage remains bounded regardless of the number of updates being synchronized.
In this commit, we update the mockChannelGraphTimeSeries to implement the new iterator-based UpdatesInHorizon interface. The mock maintains its existing behavior of receiving messages through a channel and returning them to the caller, but now wraps this in an iterator function. The implementation creates an iterator that pulls the entire message slice from the mock's response channel, then yields each message individually. This preserves the test semantics while conforming to the new interface, ensuring all existing tests continue to pass without modification.
…r.Seq2 This lets us emit a rich error if things fail when first creating the iterator, or if any of the yield attempts fail.
d3c57c9
to
506c3d5
Compare
Haven't had an excuse to make an iterator yet, so I nerd-sniped myself into the creation of this PR.
This PR does a few things:
The cache changes now mean that an invocation doesn't have a consistent view of the cache, but for cases like this (serving gossip data to peers, can be lossy), we don't really need a consistent snapshot. This change should reduce over all mutex contention as well.